Depending on your requirements when doing hourly metrics calculating
distinct cardinality, a much more scalable method would be to use a hyper
log log data structure.
a scala impl people have used with spark would be
https://github.com/twitter/algebird/blob/develop/algebirdcore/src/main/scala/com/twitter/algebird/HyperLogLog.scala
On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman <
suren.hiraman@velos.io> wrote:
> Vivek,
>
> If the foldByKey solution doesn't work for you, my team uses
> RDD.persist(DISK_ONLY) to avoid OOM errors.
>
> It's slower, of course, and requires tuning other config parameters. It
> can also be a problem if you do not have enough disk space, meaning that
> you have to unpersist at the right points if you are running long flows.
>
> For us, even though the disk writes are a performance hit, we prefer the
> Spark programming model to Hadoop M/R. But we are still working on getting
> this to work end to end on 100s of GB of data on our 16node cluster.
>
> Suren
>
>
>
> On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS <vivek.ys@gmail.com> wrote:
>
>> Thanks for the input. I will give foldByKey a shot.
>>
>> The way I am doing is, data is partitioned hourly. So I am computing
>> distinct values hourly. Then I use unionRDD to merge them and compute
>> distinct on the overall data.
>>
>> > Is there a way to know which key,value pair is resulting in the OOM ?
>> > Is there a way to set parallelism in the map stage so that, each worker
>> will process one key at time. ?
>>
>> I didn't realise countApproxDistinctByKey is using hyperloglogplus. This
>> should be interesting.
>>
>> Vivek
>>
>>
>> On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen <sowen@cloudera.com> wrote:
>>
>>> Grouping by key is always problematic since a key might have a huge
>>> number of values. You can do a little better than grouping *all* values and
>>> *then* finding distinct values by using foldByKey, putting values into a
>>> Set. At least you end up with only distinct values in memory. (You don't
>>> need two maps either, right?)
>>>
>>> If the number of distinct values is still huge for some keys, consider
>>> the experimental method countApproxDistinctByKey:
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285
>>>
>>> This should be much more performant at the cost of some accuracy.
>>>
>>>
>>> On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS <vivek.ys@gmail.com> wrote:
>>>
>>>> Hi,
>>>> For last couple of days I have been trying hard to get around this
>>>> problem. Please share any insights on solving this problem.
>>>>
>>>> Problem :
>>>> There is a huge list of (key, value) pairs. I want to transform this to
>>>> (key, distinct values) and then eventually to (key, distinct values count)
>>>>
>>>> On small dataset
>>>>
>>>> groupByKey().map( x => (x_1, x._2.distinct)) ...map(x => (x_1,
>>>> x._2.distinct.count))
>>>>
>>>> On large data set I am getting OOM.
>>>>
>>>> Is there a way to represent Seq of values from groupByKey as RDD and
>>>> then perform distinct over it ?
>>>>
>>>> Thanks
>>>> Vivek
>>>>
>>>
>>>
>>
>
>
> 
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 5252466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v <suren.hiraman@sociocast.com>elos.io
> W: www.velos.io
>
>
